package com.flink.examples.kafka;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * @Description 将生产者数据写入到kafka
 * @Author JL
 * @Date 2020/08/19
 * @Version V1.0
 */
public class DataStreamSink {

    /**
     * 官方文档：https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html
     */

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //必需设置setParallelism并行度，否则不会输出
        env.setParallelism(1);
        //每隔2000ms进行启动一个检查点
        env.enableCheckpointing(2000);
        //设置模式为exactly-once
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确保检查点之间有进行500 ms的进度
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 检查点必须在一分钟内完成，或者被丢弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        //1.连接kafka
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092");
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props);

        //2.创建数据，并写入数据到流中
        TUser user = new TUser();
        user.setId(8);
        user.setName("liu3");
        user.setAge(22);
        user.setSex(1);
        user.setAddress("CN");
        user.setCreateTimeSeries(1598889600000L);
        DataStream<String> sourceStream = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));

        //3.将数据流输入到kafka
        sourceStream.addSink(producer);
        sourceStream.print();
        env.execute("flink kafka sink");
    }

}
